package com.mars.module.tool.utils;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class LogProcessor {
    private static final int UDP_PORT = 514;
    /**
     * 每秒解析速度限制
     */
    private static final int PARSE_RATE_LIMIT = 2000;
    private static final int THREAD_POOL_SIZE = 5;

    /**
     * 存储资产IP和类型的配置
     */
    private final Map<String, String> assetConfig;
    /**
     * 存储资产类型和对应的解析规则
     */
    private final Map<String, String> parsingRules;
    private final ExecutorService executorService;

    public LogProcessor() {
        assetConfig = new HashMap<>();
        parsingRules = new HashMap<>();
        executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

        // 初始化配置，可以从数据库中加载
        assetConfig.put("127.0.0.1", "Router");
        assetConfig.put("127.0.0.1", "Server");

        parsingRules.put("Router", "\\[(.*?)\\] (.*?)-");
        parsingRules.put("Server", "(.*?) - (\\d+)");
    }

    public void startProcessing() {
        try {
            DatagramSocket socket = new DatagramSocket(UDP_PORT);
            byte[] buffer = new byte[1024];

            while (true) {
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                socket.receive(packet);

                String log = new String(packet.getData(), 0, packet.getLength());
                System.out.println(log);
                String ipAddress = packet.getAddress().getHostAddress();

                String assetType = assetConfig.getOrDefault(ipAddress, "Unknown");
                String parsingRule = parsingRules.getOrDefault(assetType, "");

                if (!parsingRule.isEmpty()) {
                    executorService.execute(() -> parseAndInsert(log, ipAddress, parsingRule));
                }

                // Sleep to control the parsing rate
                Thread.sleep(1000 / PARSE_RATE_LIMIT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void parseAndInsert(String log, String ipAddress, String regex) {
        System.out.println(log);
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(log);

        if (matcher.find()) {
            String parsedData = matcher.group(1); // Assuming the first capturing group in the regex is the relevant data
            System.out.println("Parsed Data: " + parsedData);

            // Insert into Elasticsearch or perform other actions
            // For simplicity, just print the result here
            System.out.println("Inserting into Elasticsearch...");
        }
    }

    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        LogProcessor logProcessor = new LogProcessor();
        logProcessor.startProcessing();
        // Optionally, add a shutdown hook to gracefully shut down the executor service
        Runtime.getRuntime().addShutdownHook(new Thread(logProcessor::shutdown));
    }
}
